package com.persagy.iot.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, Get, Put, Result, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes

import java.util
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.util.Try

object HbaseUtil {

  val KAFKA_TOPIC = "first"

  val FAMILY_NAME = "data"

  //zookeeper 相关信息
  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"

  val HBASE_ZOOKEEPER_IP_TEST = "192.168.100.75,192.168.100.84,192.168.100.147"
  val HBASE_ZOOKEEPER_IP_DEV = "192.168.4.7,192.168.4.8,192.168.4.9"

  val HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort"
  val HBASE_ZOOKEEPER_PORT = "2181"

  var conn: Connection = _

  def getHbaseConnection(): Connection ={
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP_DEV)
    conf.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, HBASE_ZOOKEEPER_PORT)
    ConnectionFactory.createConnection(conf)
  }

  /**
   *
   * @param conn Hbase 连接
   * @param rowKey Hbase 的 rowKey
   * @param name 表名
   * @param valueMap 数据格式为：Map[family, Map[qualifier, value]],
   */
  def insertData(conn: Connection, rowKey: String, name: String, valueMap: Map[Object, Map[Object, Object]]): Unit ={

    val get = new Get("4rowKey".getBytes)
    val scan = new Scan()
    val table: Table = conn.getTable(TableName.valueOf("test_create"))
    var scanner: ResultScanner = null
    val value: util.Iterator[Result] = scanner.iterator()
    while(value.hasNext){
      val result: Result = value.next()
      val rowKey: String = Bytes.toString(result.getRow)
      for (cell:Cell <- result.listCells().asScala){
        val string: String = Bytes.toString(CellUtil.cloneQualifier(cell))
        string match {
          case "date" => println(rowKey + " : " + string)
          case _ => println("no")
        }

      }
    }
    //获取表连接
//    val tableName = TableName.valueOf(name)
//    val table = conn.getTable(tableName)

    val put = new Put(rowKey.getBytes());
    put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(valueMap.head._2.head._2.toString))

    Try(table.put(put)).getOrElse(table.close())//将数据写入HBase，若出错关闭table
  }

  /**
   * 创建表
   * @param conn Hbase 连接
   * @param tableName 要创建的表名
   * @param familyNames 建议不超过3个，否则会影响性能
   */
  def createTable(conn: Connection, tableName: String, familyNames: String*): Unit ={

    val admin: Admin = conn.getAdmin
    val tn: TableName = TableName.valueOf(tableName)

    //创建表描述
    val hTableDescriptor = new HTableDescriptor(tn)

    //遍历 familyName，创建列族
    for (familyName <- familyNames) {
      val hColumnDescriptor = new HColumnDescriptor(familyName)
      hTableDescriptor.addFamily(hColumnDescriptor)
    }

    admin.createTable(hTableDescriptor)
  }

  def main(args: Array[String]): Unit = {
    //创建原始数据表
    createTable(getHbaseConnection(), "original_data", "data")
    //创建分精度数据表
    createTable(getHbaseConnection(), "accuracy_data", "data")
  }
}
